package com.googlecode.gaal.analysis.impl;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.googlecode.gaal.analysis.api.Filter;
import com.googlecode.gaal.analysis.api.IntervalSetBuilder;
import com.googlecode.gaal.data.api.IntervalSet;
import com.googlecode.gaal.data.api.SymbolTable;
import com.googlecode.gaal.suffix.api.EmbeddedSuffixTree.EmbeddedInterval;
import com.googlecode.gaal.suffix.api.IntervalTree.Interval;
import com.googlecode.gaal.suffix.api.LinearizedSuffixTree;
import com.googlecode.gaal.suffix.api.LinearizedSuffixTree.BinaryInterval;

public class ConcurrentEmbeddedContextExtractor implements Iterable<EmbeddedInterval> {
    private final static int MAX_THREADS = 20;
    private final IntervalSet<BinaryInterval> intervalSet;
    private final EmbeddedIntervalExtractor intervalContextExtractor;

    public ConcurrentEmbeddedContextExtractor(LinearizedSuffixTree lst, SymbolTable<?> symbolTable,
            IntervalSetBuilder intervalSetBuilder, Filter<EmbeddedInterval> contextFilter, int windowSize) {
        intervalSet = intervalSetBuilder.buildIntervalSet(lst);
        this.intervalContextExtractor = new EmbeddedIntervalExtractor(lst, symbolTable, intervalSetBuilder,
                contextFilter, windowSize);
    }

    @Override
    public Iterator<EmbeddedInterval> iterator() {
        return new ContextIterator();
    }

    private class ContextIterator implements Iterator<EmbeddedInterval> {
        private final Iterator<BinaryInterval> iterator = intervalSet.iterator();
        private Iterator<EmbeddedInterval> intervalContextIterator;
        private ThreadGroup threads = new ThreadGroup("group");
        private LinkedBlockingQueue<Iterator<EmbeddedInterval>> queue = new LinkedBlockingQueue<Iterator<EmbeddedInterval>>();
        private EmbeddedInterval next = advance();

        private ContextIterator() {
            advance();
        }

        @Override
        public boolean hasNext() {
            return next != null;
        }

        @Override
        public EmbeddedInterval next() {
            if (next == null)
                throw new NoSuchElementException();
            EmbeddedInterval context = next;
            next = advance();
            return context;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        private EmbeddedInterval advance() {
            while (intervalContextIterator == null || !intervalContextIterator.hasNext()) {
                startThreads();
                try {
                    intervalContextIterator = queue.poll(10, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (threads.activeCount() == 0 && queue.isEmpty()) {
                    return null;
                }
            }
            return intervalContextIterator.next();
        }

        private void startThreads() {
            while (threads.activeCount() < MAX_THREADS && iterator.hasNext()) {
                final Interval interval = iterator.next();
                new Thread(threads, new Runnable() {
                    @Override
                    public void run() {
                        queue.add(intervalContextExtractor.iterator(interval));
                    }
                }).start();
            }
        }
    };
}
